Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2388 Improved Bulk Write API. #1884

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

qingyang-hu
Copy link
Collaborator

GODRIVER-2388
GODRIVER-3348
GODRIVER-3349
GODRIVER-3364

Summary

Improved Bulk Write API.

Background & Motivation

Refactor the (Operation).createWireMessage() to support the bulk write batching.

@mongodb-drivers-pr-bot mongodb-drivers-pr-bot bot added the priority-3-low Low Priority PR for Review label Nov 5, 2024
Copy link
Contributor

API Change Report

./v2/mongo

compatible changes

(*Client).BulkWrite: added
ClientBulkWriteException: added
ClientBulkWriteResult: added
ClientDeleteManyModel: added
ClientDeleteOneModel: added
ClientDeleteResult: added
ClientInsertOneModel: added
ClientInsertResult: added
ClientReplaceOneModel: added
ClientUpdateManyModel: added
ClientUpdateOneModel: added
ClientUpdateResult: added
ClientWriteModels: added
NewClientDeleteManyModel: added
NewClientDeleteOneModel: added
NewClientInsertOneModel: added
NewClientReplaceOneModel: added
NewClientUpdateManyModel: added
NewClientUpdateOneModel: added

./v2/mongo/options

compatible changes

ClientBulkWrite: added
ClientBulkWriteOptions: added
ClientBulkWriteOptionsBuilder: added

./v2/x/mongo/driver

incompatible changes

(*Batches).AdvanceBatch: removed
(*Batches).ClearBatch: removed
(*Batches).Valid: removed
Batches.Current: removed
##NewCursorResponse: changed from func(ResponseInfo) (CursorResponse, error) to func(./v2/x/bsonx/bsoncore.Document, ResponseInfo) (CursorResponse, error)
Operation.Batches: changed from *Batches to interface{AdvanceBatches(n int); AppendBatchArray(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error); AppendBatchSequence(dst []byte, maxCount int, maxDocSize int, totalSize int) (int, []byte, error); IsOrdered() *bool; Size() int}
##Operation.ProcessResponseFn: changed from func(ResponseInfo) error to func(context.Context, ./v2/x/bsonx/bsoncore.Document, ResponseInfo) error
ResponseInfo.ServerResponse: removed

compatible changes

(*Batches).AdvanceBatches: added
(*Batches).AppendBatchArray: added
(*Batches).AppendBatchSequence: added
(*Batches).IsOrdered: added
(*Batches).Size: added
ExtractCursorDocument: added
ResponseInfo.Error: added

./v2/x/mongo/driver/session

incompatible changes

Client.RetryRead: removed
Client.RetryWrite: removed

./v2/x/mongo/driver/wiremessage

compatible changes

DocumentSequenceToArray: added

@@ -398,7 +398,7 @@ func TestClientSideEncryptionCustomCrypt(t *testing.T) {
"expected 0 calls to DecryptExplicit, got %v", cc.numDecryptExplicitCalls)
assert.Equal(mt, cc.numCloseCalls, 0,
"expected 0 calls to Close, got %v", cc.numCloseCalls)
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 2,
assert.Equal(mt, cc.numBypassAutoEncryptionCalls, 1,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only call it once after the operation.go refactoring.

// A top-level error that occurred when attempting to communicate with the server
// or execute the bulk write. This value may not be populated if the exception was
// thrown due to errors occurring on individual writes.
TopLevelError *WriteError
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot use Error as a field name because of the conflict with the conventional method name.

@qingyang-hu qingyang-hu marked this pull request as ready for review November 5, 2024 20:18
@qingyang-hu qingyang-hu added priority-2-medium Medium Priority PR for Review and removed priority-3-low Low Priority PR for Review labels Nov 5, 2024
@@ -612,6 +612,56 @@ func (bwe BulkWriteException) HasErrorCodeWithMessage(code int, message string)
// serverError implements the ServerError interface.
func (bwe BulkWriteException) serverError() {}

// ClientBulkWriteException is the error type returned by ClientBulkWrite operations.
type ClientBulkWriteException struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should add a compile check for the bulk write exception:

var _ error = &ClientBulkWriteException{}

@@ -894,6 +892,86 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
}
}

// BulkWrite performs a client-level bulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method should not panic when models=nil:

client, err := mongo.Connect()
if err != nil {
	panic(err)
}

defer func() { _ = client.Disconnect(context.Background()) }()

_, err = client.BulkWrite(context.Background(), nil) // Should not panic 

Suggest propagating mongo.ErrNilValue and adding a unit test / extending the unified spec tests.

// BulkWrite performs a client-level bulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
opts ...options.Lister[options.ClientBulkWriteOptions]) (*ClientBulkWriteResult, error) {
// TODO: Remove once DRIVERS-2888 is implemented.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The corresponding GODRIVER ticket should be included in the TODO:

Suggested change
// TODO: Remove once DRIVERS-2888 is implemented.
// TODO(GODRIVER-3403): Remove after support for QE with Client.bulkWrite.

}

// AppendInsertOne appends ClientInsertOneModels.
func (m *ClientWriteModels) AppendInsertOne(database, collection string, models ...*ClientInsertOneModel) *ClientWriteModels {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest abstracting the Append* methods:

type clientBulkWriteModel interface {
	ClientInsertOneModel
}

// appendModels is a helper function to append models to ClientWriteModels.
func appendModels[T clientBulkWriteModel](m *ClientWriteModels, database, collection string, models []*T) *ClientWriteModels {
	if m == nil {
		m = &ClientWriteModels{}
	}
	for _, model := range models {
		m.models = append(m.models, clientWriteModel{
			namespace: fmt.Sprintf("%s.%s", database, collection),
			model:     model,
		})
	}
	return m
}

}
type clientWriteModel struct {
namespace string
model interface{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add stronger type constraints to this?

type clientBulkWriteModel interface {
	ClientInsertOneModel // etc.
}

type clientWriteModel struct {
	namespace string
	model     clientBulkWriteModel
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need an additional abstraction for an un-exported struct.

@@ -13,6 +13,51 @@ import (
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/operation"
)

// ClientBulkWriteResult is the result type returned by a client-level BulkWrite operation.
type ClientBulkWriteResult struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not include Acknowledged in the spirit of GODRIVER-2821 ?

}

// ClientInsertResult is the result type returned by a client-level bulk write of InsertOne operation.
type ClientInsertResult struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest naming this ClientBulkWriteInsertOneResult to avoid confusion.

}

// ClientUpdateResult is the result type returned from a client-level bulk write of UpdateOne, UpdateMany, and ReplaceOne operation.
type ClientUpdateResult struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest naming this to ClientBulkWriteUpdateResult

}

// ClientDeleteResult is the result type returned by a client-level bulk write DeleteOne and DeleteMany operation.
type ClientDeleteResult struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest naming this to ClientBulkWriteDeleteResult

}

// Error implements the error interface.
func (bwe ClientBulkWriteException) Error() string {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't return an error if the write is unacknowledged. The specifications required that users be able to discern whether a BulkWriteResult contains acknowledged results. Either return an error indicating an unacknowledged result, or update ClientBulkWriteResult in the spirit of GODRIVER-2821.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can test this with the following:

package main

import (
	"context"

	"go.mongodb.org/mongo-driver/v2/bson"
	"go.mongodb.org/mongo-driver/v2/mongo"
	"go.mongodb.org/mongo-driver/v2/mongo/options"
	"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
)

func main() {
	client, err := mongo.Connect()
	if err != nil {
		panic(err)
	}

	defer func() { _ = client.Disconnect(context.Background()) }()

	pairs := &mongo.ClientWriteModels{}

	insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})

	opts := options.ClientBulkWrite().SetWriteConcern(writeconcern.Unacknowledged()).SetOrdered(false)

	pairs = pairs.AppendInsertOne("db", "k", insertOneModel)
	_, err = client.BulkWrite(context.Background(), pairs, opts) // Should not panic
	if err != nil {
		panic(err)
	}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a unified spec test that covers this case? If not we should add one / add an integration test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority-2-medium Medium Priority PR for Review
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants